https://www.getdaft.io logo
Join Slack
Powered by
# general
  • g

    Garrett Weaver

    04/09/2025, 8:46 PM
    when using
    sort_merge
    , would there be any known reason why Ray might "hang" and stop scheduling tasks?
    c
    • 2
    • 3
  • z

    Zapier

    04/10/2025, 7:33 PM
    What's Changed ๐Ÿš€ โœจ Features - feat: Set num threads for local runners @colin-ho (#4170) - feat: add
    strftime
    @universalmind303 (#4146) - feat: adds case-sensitive and case-normalized catalog and table identifiers [1/3] @rchowell (#4149) - feat(window): Add window partition execution @f4t4nt (#4097) - feat: add
    is_t
    and inner property methods to
    DataType
    @universalmind303 (#4141) - feat: add
    unix_timestamp
    function @universalmind303 (#4130) - feat: add io config for spark @universalmind303 (#4099) - feat: Implement flight server in rust @colin-ho (#4004) - feat(window): Add ExtractWindowFunction optimizer rule @f4t4nt (#4093) - feat(window): Add window function definitions and skeleton API @f4t4nt (#4082) - feat: add dt.day\_of\_year @universalmind303 (#4129) ๐Ÿ› Bug Fixes - fix: Re-enable mypy @colin-ho (#4160) - fix: TimeUnit repr format @universalmind303 (#4164) - fix: Fix list aggregates on empty series @desmondcheongzx (#4155) - fix: sorting with nulls\_first @universalmind303 (#4154) - fix: ignore hugging face in broken link workflow @rchowell (#4137) ๐Ÿš€ Performance - perf: Optimize shuffle cache @colin-ho (#4101) - perf(optimizer): anti/semi join pushdown rule @kevinzwang (#4132) ๐Ÿ“– Documentation - docs: Fix tutorial notebooks @desmondcheongzx (#4161) - docs: documents iceberg integration @rchowell (#4070) ๐Ÿ‘ท CI - ci: Native doc tests @colin-ho (#4142) ๐Ÿ”ง Maintenance - chore: Update bug\_report.yml to use native runner @colin-ho (#4169) - chore: simplify rust identifier to a string vector @rchowell (#4140) Full Changelog: https://github.com/Eventual-Inc/Daft/compare/v0.4.9...v0.4.10 Release Notes: https://github.com/Eventual-Inc/Daft/releases/tag/v0.4.10
    ๐Ÿ™Œ 1
  • k

    Kesav Kolla

    04/10/2025, 7:45 PM
    Hi my usecase is to connect to RDBS execute some query and write the results as parquet files. The the table I'm querying has 10B rows. I would like to know how to control the partition read and ability to pause/resume. The query execution and data transfer typically spawns days in any case if the process fails I don't want to rerun all 10B rows again. So wondering how to use checkpoint for resuming and second point is how to control the parallel reads.
    k
    • 2
    • 3
  • w

    Wes Madrigal

    04/11/2025, 3:19 PM
    If you're doing multi table data munging we now have integrated with
    daft
    : https://github.com/wesmadrigal/GraphReduce
    โค๏ธ 6
  • c

    Cory Grinstead

    04/11/2025, 4:27 PM
    is it possible to enforce type constraints on udfs? for example:
    Copy code
    @daft.udf(return_dtype=str)
    def make_greeting(a,b, greeting: str ="hello"):
        return [f"{greeting}, {a} {b}" for a, b in zip(a, b)]
    
    df.select(make_greeting(
        col('first_name'), 
        col('last_name'), 
        lit(1) # <-------------------- any way to make this error out and enforce that it's actually a string 
    )).collect()
    k
    • 2
    • 1
  • e

    Elvn

    04/16/2025, 9:39 AM
    Hi Folks, I am new to daft and wanted to see how is it different from Spark. I am testing it on Databricks Cloud. I tried to connect t Unity Catalog but getting some error: Let me know how to resolve this error.
    n
    • 2
    • 6
  • a

    Andrew Fuqua

    04/16/2025, 1:34 PM
    Hi, I have a Daft dataframe
    df
    with a single column of type Map<utf8, utf8>. When I try to write it to parquet, I get an error:
    ArrowInvalid: Map keys must be annotated as required.
    Copy code
    print(df.schema())
    โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
    โ”‚ column_name                 โ”† type            โ”‚
    โ•žโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ชโ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•โ•ก
    โ”‚ foo_map                     โ”† Map[Utf8: Utf8] โ”‚
    โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
    
    print(df.schema().to_pyarrow_schema())
    foo_map: map<large_string, large_string>
      child 0, entries: struct<key: large_string, value: large_string> not null
          child 0, key: large_string
          child 1, value: large_string
    For more context, I constructed the dataframe by reading from an existing parquet file and selecting only a map type column. Writing non-map columns from the source parquet to a new parquet file works fine. I've narrowed down the issue to columns that have map types. Any advice for how to proceed here?
    c
    c
    • 3
    • 8
  • u

    ืขืžื™ืช ื’ืœืขื“

    04/16/2025, 2:26 PM
    Hello, i am starting a poc with daft + ray , i created an ec2 manually and started ray with the following command:
    Copy code
    ray start --head --port=6379 --node-ip-address 44.222.217.130 --dashboard-host 0.0.0.0
    i Event checked that all is good with ray status
    Copy code
    ======== Autoscaler status: 2025-04-16 14:23:06.641656 ========
    Node status
    ---------------------------------------------------------------
    Active:
     1 node_8da791ef7c8b8034fcb2fdd215c5aee700d6c6b8a9b2b095848a60bb
    Pending:
     (no pending nodes)
    Recent failures:
     (no failures)
    
    Resources
    ---------------------------------------------------------------
    Usage:
     0.0/48.0 CPU
     0B/259.13GiB memory
     0B/111.06GiB object_store_memory
    
    Demands:
     (no resource demands)
    I opened all ports from my local laptop to the machine and have access to the ray dashboard.| But when i try to run the example in the docs (https://www.getdaft.io/projects/docs/en/stable/distributed/)
    Copy code
    import daft
    
    daft.context.set_runner_ray(address="ray://<ray_address>:<port>")
    
    df = daft.from_pydict({
        "a": [3, 2, 5, 6, 1, 4],
        "b": [True, False, False, True, True, False]
    })
    
    print(df)
    But i keep getting the following error:
    Copy code
    Traceback (most recent call last):
      File "/Users/amit/projects/lakesphere/Glacierops/ray.py", line 2, in <module>
        import ray
      File "/Users/amit/projects/lakesphere/Glacierops/ray.py", line 9, in <module>
        daft.context.set_runner_ray(address="<ray://44.222.217.130>")
      File "/Users/amit/projects/lakesphere/Glacierops/venv/lib/python3.12/site-packages/daft/context.py", line 84, in set_runner_ray
        py_ctx = _set_runner_ray(
                 ^^^^^^^^^^^^^^^^
    RuntimeError: Cannot set runner more than once
    any help would be appreciated ๐Ÿ™‚
    c
    c
    • 3
    • 9
  • g

    Garrett Weaver

    04/17/2025, 12:16 AM
    anyone to quiet these logs, getting spammed with the below on ray runner
    Copy code
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    INFO:daft_io.stats:IOStatsContext: PyMicroPartition::to_record_batch, Gets: 0, Heads: 0, Lists: 0, BytesRead: 0, AvgGetSize: 0, BytesUploaded: 0, AvgPutSize: 0
    c
    • 2
    • 2
  • y

    Yufan

    04/17/2025, 9:08 AM
    hey folks, I'd love to get some inputs from the daft team on
    file writes with custom name templates
    . frameworks like tensorflow post a requirement on the output file name (e.g., https://www.tensorflow.org/datasets/api_docs/python/tfds/core/ShardedFileTemplate) in
    ${SHARD_INDEX}-of-${NUM_SHARDS}
    format. but it looks like daft currently only supports the https://github.com/Eventual-Inc/Daft/blob/260a408fd6917ce1b09aa83b835c132510be3271/daft/recordbatch/recordbatch_io.py#L535 pattern. im not able to use the
    to_ray_dataset
    converter as its requires full materialization atm, so just wanna think out loud and see if you have any clues ๐Ÿ˜„
    c
    • 2
    • 12
  • y

    Yufan

    04/17/2025, 10:10 PM
    one more question: whats the best way in Daft to do column prefetching? ๐Ÿงต
    c
    • 2
    • 3
  • y

    Yufan

    04/21/2025, 5:13 PM
    hey folks, I ran into the error below when trying to read a bulk of parquet files into daft with
    daft.read_parquet
    , just wonder if theres any way to configure retries for this issue. would be great if I could get some guidances on where to patch it, Im able to build daft locally and happy to try any patches ๐Ÿ˜„
    Copy code
    Error Type: TASK_EXECUTION_EXCEPTION
    
    User exception:
    ceptions.ByteStreamError: Io error: Cached error: Misc Transient error trying to read <s3://foo.parquet>
    Details:
    DispatchFailure(DispatchFailure { source: ConnectorError { kind: Other(None), source: hyper::Error(Connect, Ssl(Error { code: ErrorCode(5), cause: None }, X509VerifyResult { code: 0, error: "ok" })), connection: Unknown } })
    j
    k
    • 3
    • 12
  • c

    ChanChan Mao

    04/21/2025, 5:26 PM
    the complete latest docs version is now live on https://www.getdaft.io/projects/docs/en/latest/ ๐ŸŽ‰ once we cut the release this week, it'll be available as the stable version. let us know if you have any feedback!
    ๐Ÿ™Œ 3
    daft party 1
    ๐Ÿ”ฅ 4
    r
    j
    • 3
    • 2
  • a

    Andrew Fuqua

    04/21/2025, 10:59 PM
    Does the read splitting mentioned in this blog post work in the native runner? I'm running into resource exhaustion when reading a single 2.5GB parquet file and attempting to write it back out repartitioned across 30 buckets using iceberg writer. This works for smaller files (tested with 200MB). I've set the daft execution config to the same values as in the post. The node has 32GB RAM which seems like enough for the task, but it is actually quickly exhausted (within 2 minutes), same for swap. Any other params I could tune to make this work? Would a local Ray cluster handle this task within the same resources?
    c
    • 2
    • 1
  • e

    Everett Kleven

    04/24/2025, 4:45 PM
    Is Anyone else experiencing strange scrolling behavior on the Docs? Seems to have started post update. If you scroll down, it will scroll to the top automatically
    Screen Recording 2025-04-24 at 11.39.28โ€ฏAM.mov
    c
    d
    +2
    • 5
    • 17
  • y

    Yufan

    04/25/2025, 3:56 PM
    Hey folks, wanna get some inputs from the experts to see if this pattern is something Daft could support ๐Ÿงต
    c
    • 2
    • 4
  • a

    Andrew Kursar

    04/25/2025, 9:29 PM
    Hello! the recent 0.4.11 daft release included a <=16 pyarrow contraint, see https://github.com/Eventual-Inc/Daft/pull/4225 , but the latest pyiceberg requires >=17, see https://github.com/apache/iceberg-python/blob/pyiceberg-0.9.0/pyproject.toml#L64 . Anyone know what the newly added max bound on daft is about? Or more directly if there are any issues to watch that are blocking the use of the later pyarrow releases? I've been using pyiceberg 0.9.0 with daft 0.4.10 but now can't upgrade daft without downgrading pyiceberg.
    c
    • 2
    • 3
  • y

    yashovardhan chaturvedi

    05/02/2025, 2:34 PM
    hey folks, does daft have something like https://fastht.ml/docs/#getting-help-from-ai , https://fastht.ml/docs/llms-ctx.txt which can be fed to llms might make using daft more productive with cursor etc.
    ๐Ÿ‘€ 1
    โค๏ธ 2
    n
    d
    • 3
    • 5
  • n

    Neil Wadhvana

    05/04/2025, 1:55 AM
    Hey guys, I'd like to do something like this in
    daft
    without needing to specify each column (since I can have anywhere from 1 to 5). Is there another syntax that would work? This is not working as is:
    Copy code
    @daft.udf(return_dtype=daft.DataType.python())
    def mean_ensemble(*depth_value_series: daft.Series) -> List[Dict[str, np.ndarray]]:
        """Apply mean ensemble to depth maps."""
        depth_value_lists = [series.to_pylist() for series in depth_value_series]
        reduced_depth_maps: List[Dict[str, np.ndarray]] = []
        for depth_value_list in depth_value_lists:
            # Calculate mean and standard deviation across all depth maps in the list
            stacked_depths = np.stack(depth_value_list, axis=0)
            mean_depth = np.mean(stacked_depths, axis=0)
            std_depth = np.std(stacked_depths, axis=0)
            reduced_depth_maps.append(
                {
                    "mean": mean_depth,
                    "std": std_depth,
                }
            )
    
        return reduced_depth_maps
    c
    • 2
    • 4
  • g

    Garrett Weaver

    05/14/2025, 1:05 PM
    ๐Ÿ‘‹ any timeline for when window functions will be available for ray runner?
    c
    • 2
    • 1
  • y

    Yufan

    05/21/2025, 11:29 PM
    hey folks, wanna have your inputs on whats the most efficient way of resizing partitions
    k
    c
    • 3
    • 8
  • y

    Yuri Gorokhov

    05/23/2025, 9:29 PM
    I am trying to explode on a column that is a list of structs (it's a fairly nested schema) and encountering this error:
    Copy code
    Attempting to downcast Map { key: Utf8, value: List(Utf8) } to \"daft_core::array::list_array::ListArray\"
    Wondering if someone has seen this before?
    c
    • 2
    • 5
  • g

    Giridhar Pathak

    05/25/2025, 1:37 AM
    hey folks im getting a weird Type error when reading from an iceberg table:
    Copy code
    TypeError: pyarrow.lib.large_list() takes no keyword arguments
    the code:
    Copy code
    table = catalog.load_table(table)
        return df.read_iceberg(table)
    has anyone experienced this before?
    • 1
    • 1
  • g

    Giridhar Pathak

    05/25/2025, 10:18 PM
    Im querying an iceberg table from a jyupter notebook (backed by 12Gb ram and 4 cpu)
    Copy code
    daft.read_table("platform.messages").filter("event_time > TIMESTAMP '2025-05-24T00:00:00Z'").limit(5).show()
    running this makes the process crash. looks like memory goes thru the roof. Not sure if its trying to read the whole table into memory. pre-materialization, i can get the schema just fine.
    c
    d
    • 3
    • 29
  • e

    Everett Kleven

    05/28/2025, 2:15 PM
    https://github.com/JanKaul/iceberg-rust ๐Ÿ‘€
    ๐Ÿ‘Œ 1
    k
    • 2
    • 6
  • y

    Yuri Gorokhov

    05/28/2025, 4:14 PM
    Is there an equivalent to pyspark's
    Copy code
    .dropDuplicates(subset: Optional[List[str]] = None)
    where you can specify which columns to consider?
    r
    k
    s
    • 4
    • 9
  • p

    Pat Patterson

    05/29/2025, 11:37 PM
    Hi there - Iโ€™m trying out Daft after meeting @ChanChan Mao and @Sammy Sidhu at Data Council a few weeks ago. I got all the queries from my recent Iceberg and Backblaze B2 blog post working - see https://gist.github.com/metadaddy/ec9e645fa0929321b626d8be6e11162e Performance in general is not great, but one query in particular is extremely slow:
    Copy code
    # How many records are in the current Drive Stats dataset?
        count, elapsed_time = time_collect(drivestats.count())
        print(f'Total record count: {count.to_pydict()['count'][0]} ({elapsed_time:.2f} seconds)')
    With the other systems I tested in my blog post, the equivalent query takes between a fraction of a second and 15 seconds. That Daft call to
    drivestats.count()
    takes 80 seconds. Iโ€™m guessing itโ€™s doing way more work than it needs to - reading the record counts from each of the 365 Parquet files rather than simply reading
    total-records
    from the most recent metadata file. Since
    SELECT COUNT(*)
    is such a common operation, I think itโ€™s worth short-circuiting the current behavior.
    c
    • 2
    • 8
  • g

    Giridhar Pathak

    06/03/2025, 2:17 PM
    Question on the Daft Native runtime ๐Ÿงต
    c
    • 2
    • 5
  • e

    Everett Kleven

    06/03/2025, 2:59 PM
    Hey daft squad, If I'm using a MemoryCatalog to track lancedb tables, am I restricted to only using dataframes at the moment?
    r
    • 2
    • 1
  • p

    Pat Patterson

    06/06/2025, 10:47 PM
    Where does the work take place when I use Daft with Ray? For example, consider the following minimal code:
    Copy code
    import daft
    import ray
    
    ray.init("<ray://head_node_host:10001>", runtime_env={"pip": ["daft"]})
    
    daft.context.set_runner_ray("<ray://head_node_host:10001>")
    
    catalog = load_catalog(
        'iceberg',
        **{
            'uri': 'sqlite:///:memory:',
            # configuration to access Backblaze B2's S3-compatible API such as
            # s3.endpoint, s3.region, etc
        }
    }
    
    catalog.create_namespace('default', { 'location': f'<s3://my-bucket/'}>)
    table = catalog.register_table('default.drivestats', metadata_location)
    
    drivestats = daft.read_iceberg(table)
    
    result = drivestats.count().collect()
    print(f'Total record count: {result.to_pydict()['count'][0]}')
    Presumably, the code to read Parquet files from Backblaze B2 via the AWS SDK executes on the Ray cluster, so I have to either install the necessary packages there ahead of time or specify them, and environment variables, in
    runtime_env
    ? For example:
    Copy code
    ray.init("ray://<head_node_host>:10001", runtime_env={
        "pip": ["daft==0.5.2", "boto3==1.34.162", "botocore==1.34.162", ...etc...],
        "env_vars": {
            "AWS_ENDPOINT_URL": os.environ["AWS_ENDPOINT_URL"],
            "AWS_ACCESS_KEY_ID": os.environ["AWS_ACCESS_KEY_ID"],
            "AWS_SECRET_ACCESS_KEY": os.environ["AWS_SECRET_ACCESS_KEY"],
            ...etc...
        }
    })
    j
    • 2
    • 2